Conversation
|
@vaquarkhan let us know when it's ready for review |
|
Its ready for review please review pull request its waiting for approval @skrawcz |
|
@vaquarkhan any screenshots of things in action? Also we'll need to add to docs/ for this -- mind adding that part? |
|
let me create google doc for documentation will add all screenshots |
| class EventDrivenBackendMixin(abc.ABC): | ||
| """Mixin for backends that support event-driven updates via SQS. | ||
|
|
||
| BIP-0042: This mixin enables backends to receive real-time notifications | ||
| from SQS instead of polling S3 for new files. | ||
| """ | ||
|
|
||
| @abc.abstractmethod | ||
| async def start_sqs_consumer(self): | ||
| """Start the SQS consumer for event-driven tracking. | ||
|
|
||
| This method should run indefinitely, processing S3 event notifications | ||
| from the configured SQS queue. | ||
| """ | ||
| pass |
There was a problem hiding this comment.
Let's remove SQS from this and make it more generic in naming.
There was a problem hiding this comment.
PR Review: Cloud Native AWS (BIP-042)
Thanks for the contribution! The overall direction of adding cloud-native AWS support is valuable. However, there are several concerns that should be addressed before merging — primarily around vendor neutrality, code quality, and architecture. As an Apache project, keeping the core framework vendor-neutral is critical.
Summary
| # | Severity | Issue |
|---|---|---|
| 1 | Blocker | start_sqs_consumer in the core backend mixin leaks AWS vendor concepts into the abstraction layer |
| 2 | Blocker | terraform/ at repo root with .terraform.lock.hcl committed — should be under examples/ or contrib/ |
| 3 | High | Bedrock integration is orthogonal to S3/SQS tracking — should be a separate PR to reduce scope |
| 4 | High | BedrockAction / BedrockStreamingAction have heavy code duplication (~90% identical __init__ and request-building) |
| 5 | High | No graceful shutdown for the SQS consumer asyncio task — CancelledError not handled |
| 6 | Medium | tracking_mode should be a Python Enum, not magic strings "POLLING" / "SQS" |
| 7 | Medium | SQS consumer only processes Records[0], silently ignoring additional records in a message is this correct behavior? |
| 8 | Medium | Tests are effectively no-ops when boto3 isn't installed (all wrapped in try/except ImportError) |
| 9 | Low | Import style (List, Dict vs list, dict), logging uses f-strings instead of %s-style |
See inline comments for details.
burr/tracking/server/backend.py
Outdated
|
|
||
|
|
||
| class EventDrivenBackendMixin(abc.ABC): | ||
| """Mixin for backends that support event-driven updates via SQS. |
There was a problem hiding this comment.
Blocker — Vendor neutrality: This mixin is part of the core backend abstraction layer, but it leaks AWS-specific concepts. The class name is fine, but start_sqs_consumer names a specific AWS service, and the docstrings reference SQS throughout.
For an Apache project, this should be vendor-neutral:
- Rename
start_sqs_consumer()→start_event_consumer() - Update docstrings to describe generic event-driven updates, not SQS specifically
- The
is_event_driven()method name is already good
This would allow a GCP Pub/Sub or Azure Event Grid implementation to use the same mixin without the naming being misleading.
| prior_snapshots_to_keep: int = 5 | ||
| # BIP-0042: Event-driven tracking settings | ||
| tracking_mode: str = "POLLING" # "POLLING" or "SQS" - POLLING is default for backward compatibility | ||
| sqs_queue_url: Optional[str] = None |
There was a problem hiding this comment.
Medium — Use an Enum instead of magic strings: "POLLING" and "SQS" are magic strings that are error-prone (e.g., typo "Polling" would silently fall through). Consider:
from enum import Enum
class TrackingMode(str, Enum):
POLLING = "POLLING"
EVENT_DRIVEN = "EVENT_DRIVEN"Also, "SQS" is an AWS-specific name. Since this setting is in the S3 backend (already AWS-specific), it's less critical than the core mixin, but "EVENT_DRIVEN" would still be more descriptive of the behavior rather than the implementation.
burr/tracking/server/s3/backend.py
Outdated
|
|
||
| async with self._session.create_client("sqs", region_name=self._sqs_region) as sqs_client: | ||
| while True: | ||
| try: |
There was a problem hiding this comment.
High — No graceful shutdown: This while True loop runs as a fire-and-forget asyncio.create_task() (see run.py). When the server shuts down, this task will be cancelled, but asyncio.CancelledError is not handled. The outer except Exception won't catch it (in Python 3.9+, CancelledError inherits from BaseException).
Recommendation:
try:
while True:
...
except asyncio.CancelledError:
logger.info("SQS consumer shutting down")
raiseAlso, the task reference in run.py is not stored anywhere — it could be garbage collected. Store it and cancel it in the shutdown path of the lifespan context manager.
| logger.info(f"Indexed S3 event: {s3_key}") | ||
| except Exception as e: | ||
| logger.error(f"Failed to handle S3 event {s3_key}: {e}") | ||
|
|
There was a problem hiding this comment.
Medium — Silent error swallowing: Catching Exception and only logging means data loss can go completely unnoticed. If indexing fails for a file, that tracking data is permanently skipped.
Consider either:
- Re-raising after logging (letting the message go back to SQS for retry, eventually to DLQ)
- At minimum, emitting a metric/counter so operators can alert on indexing failures
Currently the message gets deleted from SQS even if _handle_s3_event fails (the delete_message call is outside this try/except), so failed events are lost forever.
burr/integrations/bedrock.py
Outdated
|
|
||
|
|
||
| class BedrockStreamingAction(StreamingAction): | ||
| """Streaming variant of BedrockAction using Converse Stream API.""" |
There was a problem hiding this comment.
High — Code duplication: BedrockStreamingAction duplicates ~90% of BedrockAction:
- Identical
__init__(all parameters, client creation, instance variable assignments) - Identical
reads,writes,nameproperties - Identical request-building logic in
stream_runvsrun_and_update
Consider extracting a shared base class:
class _BedrockBase:
def __init__(self, model_id, input_mapper, reads, writes, ...):
# shared init
def _build_request(self, state: State) -> Dict[str, Any]:
# shared request building
class BedrockAction(_BedrockBase, SingleStepAction): ...
class BedrockStreamingAction(_BedrockBase, StreamingAction): ...There was a problem hiding this comment.
and also move this to a different PR
burr/integrations/bedrock.py
Outdated
|
|
||
| config = Config(retries={"max_attempts": max_retries, "mode": "adaptive"}) | ||
| self._client = boto3.client("bedrock-runtime", region_name=region, config=config) | ||
|
|
There was a problem hiding this comment.
Medium — Client created at construction time: The boto3 client is created eagerly in __init__, which means:
- The action can't be serialized/pickled (relevant for distributed execution)
- Testing requires mocking at construction time or having valid AWS credentials
- Credentials are captured at construction, not at invocation
Consider lazy client creation (create on first run_and_update call) or accepting a client/session as a parameter for testability.
There was a problem hiding this comment.
I think any client should be passed in (i.e. injected) -- that way it's easier for tests etc.
| # BIP-0042: Start SQS consumer for event-driven tracking when configured | ||
| if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven(): | ||
| asyncio.create_task(backend.start_sqs_consumer()) | ||
| global initialized |
There was a problem hiding this comment.
High — Task reference not stored: asyncio.create_task() returns a task reference that isn't assigned to anything. This means:
- The task could be garbage collected
- There's no way to cancel it during shutdown
- Exceptions in the task will be silently lost
Store the task and cancel it in the shutdown path:
sqs_task = None
if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven():
sqs_task = asyncio.create_task(backend.start_event_consumer())
yield
if sqs_task:
sqs_task.cancel()
try:
await sqs_task
except asyncio.CancelledError:
passAlso, import asyncio should be a top-level import, not inside the function.
terraform/.terraform.lock.hcl
Outdated
| @@ -0,0 +1,45 @@ | |||
| # This file is maintained automatically by "terraform init". | |||
| # Manual edits may be lost in future updates. | |||
There was a problem hiding this comment.
Blocker — Should not be committed: .terraform.lock.hcl is a generated file specific to the contributor's Terraform version and platform. It should be added to .gitignore (or the terraform .gitignore), not committed to the repo.
More broadly: the entire terraform/ directory (including dev.tfvars, prod.tfvars with hardcoded us-east-1) should live under examples/deployment/aws/ or contrib/aws/ rather than at the repo root. Burr is a Python framework library — shipping infrastructure-as-code at the top level makes the project appear AWS-first rather than vendor-neutral.
|
|
||
| assert BedrockAction is not None | ||
| except ImportError as e: | ||
| assert "bedrock" in str(e).lower() or "boto3" in str(e).lower() |
There was a problem hiding this comment.
Medium — Tests are no-ops without boto3: Every test is wrapped in try/except ImportError with an assertion on the error message. When boto3 isn't installed (which is the case in CI unless explicitly added), all tests silently pass without testing anything.
Better approaches:
- Use
pytest.importorskip("boto3")to clearly skip tests when the dependency is missing - Mock
boto3for unit tests so they run regardless of whether boto3 is installed - For integration tests that need real AWS, mark them with
@pytest.mark.integrationand skip in CI
There was a problem hiding this comment.
When this moves to a separate PR, you can follow what we do for the databases plugins and have explicit integration tests -- so we'd just remove the imports
burr/integrations/__init__.py
Outdated
| # under the License. | ||
|
|
||
|
|
||
| def __getattr__(name: str): |
burr/tracking/__init__.py
Outdated
|
|
||
| def __getattr__(name: str): | ||
| """Lazy load S3TrackingClient to avoid requiring boto3 unless used.""" | ||
| if name == "S3TrackingClient": | ||
| from burr.tracking.s3client import S3TrackingClient | ||
| return S3TrackingClient | ||
| raise AttributeError(f"module {__name__!r} has no attribute {name!r}") | ||
|
|
||
|
|
||
| __all__ = ["LocalTrackingClient", "S3TrackingClient"] |
There was a problem hiding this comment.
I don't think we need this?
| @@ -0,0 +1,109 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
let's move these to an aws folder under terraform or something
…he#664) - Event-driven SQS telemetry: S3 notifications to SQS, near-instant updates - Buffered S3 persistence: SpooledTemporaryFile fixes seek errors on large files - Native BedrockAction and BedrockStreamingAction for Bedrock integration - Terraform module: S3, SQS, IAM with dev/prod tfvars and tutorial
… fixed build error
This reverts commit 61673cd.
c913d03 to
5ca9739
Compare
vaquarkhan
left a comment
There was a problem hiding this comment.
Please review all findings are fixed
|
|
||
| def test_s3_settings_has_tracking_mode(self): | ||
| """Verify tracking_mode field exists with POLLING default.""" | ||
| from burr.tracking.server.s3.backend import S3Settings, TrackingMode |
There was a problem hiding this comment.
can you move these to the top level please? we can then add pytest.importorskip if the dependencies aren't installed.
skrawcz
left a comment
There was a problem hiding this comment.
Re-review after fixes
Thanks for the updates @vaquarkhan — the PR has improved significantly since the first review. Here's what's been addressed and what remains.
Previously requested items — now fixed
| # | Issue | Status |
|---|---|---|
| 1 | start_sqs_consumer in core mixin leaked AWS concepts |
Fixed — renamed to start_event_consumer, docstrings are vendor-neutral |
| 2 | terraform/ at repo root, .terraform.lock.hcl committed |
Fixed — moved to examples/deployment/aws/terraform/, lock file removed |
| 3 | Bedrock integration bundled in same PR | Fixed — removed, now in separate PR #677 |
| 4 | No graceful shutdown for event consumer task | Fixed — task stored, cancelled on shutdown with CancelledError handling |
| 5 | tracking_mode should be Enum |
Fixed — TrackingMode(str, Enum) with POLLING/EVENT_DRIVEN |
| 6 | Silent error swallowing in _handle_s3_event |
Fixed — now re-raises after logging |
| 7 | Task reference not stored in run.py |
Fixed — stored and cancelled in shutdown |
| 8 | asyncio import inside function |
Fixed — now top-level |
| 9 | Lazy loading changes to __init__.py files |
Fixed — removed (was Bedrock scope) |
Remaining items before merge
1. Linting failures (blocking)
black: 2 files would be reformatted (run.py,s3/backend.py) — e.g., class declaration on line 208 ofs3/backend.pyexceeds 100 charsisort: 3 files have incorrectly sorted imports (e.g.,from pydantic import field_validatoron line 36 not grouped with pydantic import on line 34)flake8: F811 ins3/backend.py:868(duplicateimport asyncioin__main__block), F401 intest_bip0042_s3_buffering.py:22(unusedimport pytest)
Please run pre-commit run --all-files on the changed files.
2. Tests fail (blocking)
12 of 14 tests fail with ModuleNotFoundError: No module named 'aiobotocore'. Tests that import from the S3 backend need pytest.importorskip("aiobotocore") at the module level so they skip gracefully when the dependency isn't installed.
3. Tests are structural only — need behavioral tests (high)
Current tests only check that fields/methods exist (signature inspection). Please add tests that exercise actual logic:
TrackingModecoercion:S3Settings(tracking_mode="SQS")→EVENT_DRIVENis_event_driven()returns True/False based on config- Message parsing for both EventBridge and S3 notification formats (mock the SQS client)
_query_s3_filebuffering behavior
4. Records[0] — only first record processed (medium)
In start_event_consumer line 803, S3→SQS notifications can batch multiple records per message but only Records[0] is processed. Should iterate over all records:
elif "Records" in body:
for record in body["Records"]:
s3_key = record["s3"]["object"]["key"]
event_time = datetime.datetime.fromisoformat(
record["eventTime"].replace("Z", "+00:00")
)
if s3_key and s3_key.endswith(".jsonl"):
await self._handle_s3_event(s3_key, event_time)5. CI change (suggestion)
Adding tracking-server-s3 to the base test install in python-package.yml means all CI runs install aiobotocore/boto3. Consider keeping this in a separate test job (like test-persister-dbs) to avoid bloating the base test environment.
Overall the PR is in much better shape. Fixing the linting, making tests work and adding behavioral coverage should get this to merge-ready.
|
Hope now all issue would resolved |
[Short description explaining the high-level reason for the pull request]
Changes
BIP-042 issue #664
How I tested this
Deploy using terraform
Notes
Checklist